我們講到 Speed Layer 就像一隊外送員,專門把 Kafka 新鮮出爐的訂單火速送到資料庫。但是,衝得快不代表不會翻車。今天我們來看第三種真實會發生的「外送災難」:訂單堆爆。
想像一下,你是物流平台的調度員。剛開始一切都很順利:
看起來很完美,對吧?但突然間…
雙十一購物節!
你的外送員開始在配送中心門口大排長龍,包裹越堆越多,記憶體開始爆炸,系統即將崩潰…
這不是虛構的故事,這是每個流處理工程師都會遇到的經典問題:背壓(Backpressure)。
當你的 PostgreSQL 因為各種原因變慢時:
你的流處理系統就會開始「消化不良」,最終可能發生任何問題而掛掉。
很多團隊第一反應是:
「開機器啊!加 CPU、加 RAM、加 DB 寫入節點!」
雖然短期有效,但你永遠追不上流量高峰,因為促銷活動可能瞬間多 100 倍訂單,無限加錢不現實。
今天我們要為我們的 Simple Streaming 框架實作一個背壓機制。核心概念很簡單:
當下游處理不過來時,讓上游聰明地「踩煞車」
重要提醒
本文所有程式碼皆為教學與概念演示用的 Pseudo Code,可以幫助你理解並解決實際場景的問題,但這裡的程式碼不能直接使用,主要目的是用來講解 Stream Processing 的設計思路與核心概念。閱讀時建議把重點放在理解整體架構和設計邏輯上,程式碼細節部分可以輕鬆看過就好。
class StreamingOverloadException(Exception):
"""
當系統過載時拋出的異常
"""
def __init__(self, message: str, pause_seconds: float = 3.0):
super().__init__(message)
self.pause_seconds = pause_seconds # 建議休息時間
我們的 PostgreSQL Sink 需要變聰明,學會監控自己的處理能力。讓我們一步步解析背壓機制的關鍵設計:
我們的 PostgreSQL Sink 需要學會監控自己的健康狀態。讓我們看看它是如何工作的:
Normal Processing Flow (正常處理流程):
Step 1: Data Arrives
┌─────────────────┐
│ New Message │ ──► write() method
└─────────────────┘
Step 2: Buffer Management
┌─────────────────┐ YES ┌──────────────────────┐
│ Buffer >= 100? │ ────────►│ _flush_and_clear() │
└─────────────────┘ └──────────────────────┘
│ NO │
▼ ▼
┌─────────────────┐ ┌──────────────────────┐
│ Add to Buffer │ │ Start Timer │
└─────────────────┘ └──────────────────────┘
Step 3: Performance Monitoring
┌──────────────────────┐ ┌──────────────────────┐
│ Execute Batch │────►│ Calculate Duration │
│ Insert to DB │ │ (end_time - start) │
└──────────────────────┘ └──────────────────────┘
│
▼
┌──────────────────────┐
│ _check_overload() │
│ Performance Judge │
└──────────────────────┘
設計思路說明:
Step 1 - 數據入口:每個新訊息都通過 write()
方法進入系統。
def write(self, message: Dict[str, Any]):
"""將 message 加入 buffer,達到 batch_size 時批次寫入"""
# 從 message 取得實際資料(延續 Day 6 設計)
data = message.get('value', {})
if not data:
return
# 加入 buffer
self._buffer.append(data)
# 達到批次大小,批量寫入
if len(self._buffer) >= self.batch_size:
self._flush_and_clear() # 觸發 Step 2
Step 2 - 智能緩衝:當 buffer 達到 100 筆時,觸發 _flush_and_clear()
。這裡是背壓檢測的關鍵入口。
Step 3 - 關鍵創新:在 _flush_and_clear()
中添加計時器,讓系統感知自己的處理速度:
def _flush_and_clear(self):
"""批次寫入資料,順便當個「身體健康監測器」"""
if not self._buffer:
return
# 關鍵:開始計時(對應圖中 "Start Timer")
flush_start_time = time.time()
buffer_size = len(self._buffer)
try:
# 執行批次寫入到 PostgreSQL(對應圖中 "Execute Batch Insert")
# ... 執行批量插入邏輯
# 計算處理時間(對應圖中 "Calculate Duration")
flush_duration = time.time() - flush_start_time
# 關鍵:檢查是否過載(對應圖中 "_check_overload()")
self._check_overload(flush_duration, buffer_size)
# 清空 buffer
self._buffer.clear()
except Exception as e:
logger.error(f"批次寫入失敗: {e}")
raise
關鍵在於如何判斷系統「真的累了」,而不是偶爾的性能波動。我們採用了「三振出局」策略:
Overload Detection Logic (過載檢測邏輯):
┌──────────────────────┐
│ Duration > 2.0s ? │
└──────────────────────┘
│
┌─────────────────┐
│ YES │ NO │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ slow_count++│ │ slow_count--│
└─────────────┘ └─────────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│Count >= 3 ? │ │ Keep Normal │
└─────────────┘ │ Processing │
│ └─────────────┘
▼ YES
┌─────────────────────────────────────┐
│ BACKPRESSURE TRIGGER │
│ │
│ pause_time = 5.0s │
│ throw StreamingOverloadException │
└─────────────────────────────────────┘
設計思路說明:
閾值設定(2.0秒):為什麼選擇 2 秒?在正常情況下,批次寫入 100 筆記錄應該在幾百毫秒內完成。2 秒已經是明顯的性能異常信號。這個閾值可以根據實際硬體和資料庫配置調整。
計數器機制(slow_count):單次慢速處理可能只是偶發情況(網路抖動、其他查詢競爭等)。我們使用計數器來區分「偶發慢速」和「持續過載」:
三振出局策略:連續 3 次慢速處理才觸發背壓。這個設計避免了過於敏感的誤報,同時能快速響應真正的系統過載。
固定暫停時間:5 秒的暫停時間足夠讓大部分暫時性問題(如短期鎖競爭、記憶體 GC 等)自行恢復,同時不會讓系統停頓太久。
首先,初始化背壓參數:
def __init__(self, batch_size: int = 100):
...
# 新增:背壓檢測參數(對應上方圖解中的各個閾值)
self._slow_flush_count = 0 # 計數器:追蹤連續慢速處理次數
self._max_slow_flush = 3 # 觸發閾值:連續 3 次就觸發背壓
self._slow_flush_threshold = 2.0 # 時間閾值:超過 2 秒算慢速處理
然後,對應的檢測邏輯實現:
def _check_overload(self, flush_duration: float):
"""檢測是否過載 - 就像外送員檢查自己是不是太累了"""
# 步驟 1:檢查處理時間是否超過閾值(對應圖中 "Duration > 2.0s ?")
if flush_duration > self._slow_flush_threshold: # 2.0 秒閾值
# 步驟 2:增加慢速計數(對應圖中 "slow_count++")
self._slow_flush_count += 1
logger.warning(f"處理變慢了: {flush_duration:.2f}s (第 {self._slow_flush_count} 次)")
# 步驟 3:檢查是否達到觸發條件(對應圖中 "Count >= 3 ?")
if self._slow_flush_count >= self._max_slow_flush: # 連續 3 次
# 步驟 4:觸發背壓機制(對應圖中 "BACKPRESSURE TRIGGER")
error_msg = f"PostgreSQL 處理過載:連續 {self._slow_flush_count} 次慢速處理"
pause_time = 5.0 # 固定休息 5 秒,簡單易懂
self._slow_flush_count = 0 # 重置計數器
# 拋出過載異常,觸發背壓機制!
raise StreamingOverloadException(error_msg, pause_seconds=pause_time)
else:
# 處理速度恢復正常,給點鼓勵(對應圖中 "slow_count--")
self._slow_flush_count = max(0, self._slow_flush_count - 1)
現在我們需要讓 SimpleStreamingEngine 學會處理這個「求救信號」。讓我們分析應用層的背壓響應機制:
SimpleStreamingEngine 需要在訊息處理的入口就進行背壓檢查,確保不會遺失任何數據:
Step 1: Message Processing Entry Point
┌─────────────────────────────────────┐
│ New Message Arrives │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ message_handler() │
│ │
│ First Check: _should_pause()? │
└─────────────────────────────────────┘
│
┌──────────────────────┐
│ YES │ NO │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Pause Kafka │ │ Process Message │
│ Consumer │ │ Normally │
│ │ │ │
└─────────────────┘ └─────────────────┘
│
▼
┌─────────────────────┐
│ Exception Handler │
└─────────────────────┘
Step 1 - 訊息處理入口:每個來自 Kafka 的訊息都會通過 _create_message_handler()
創建的處理器:
def _create_message_handler(self, source):
"""創建訊息處理器 - 加上智能煞車功能"""
def message_handler(message):
# 第一道防線:檢查系統狀態(對應圖中 "_should_pause()?")
if self._should_pause():
logger.debug("因背壓暫停,暫停 Kafka 消費")
self._pause_kafka_consumer(source) # 對應圖中 "Pause Kafka Consumer"
return
# 正常處理流程(對應圖中 "Process Message Normally")
try:
df = self._source_dataframe_map[source]
df.process_message(message)
except StreamingOverloadException as e:
# 異常處理:收到下游求救信號(對應圖中 "Exception Handler")
self._handle_overload(e, source)
except Exception as e:
logger.error(f"其他錯誤: {e}")
return message_handler
關鍵設計思路:
當 PostgreSQL Sink 拋出 StreamingOverloadException 時,整個 SimpleStreamingEngine 進入緊急狀態:
Normal Processing State:
┌─────────────────────────────────────┐
│ _paused = False │
│ Normal Processing │
└─────────────────────────────────────┘
│
▼ StreamingOverloadException
┌─────────────────────────────────────┐
│ EMERGENCY BRAKE │
│ │
│ _paused = True │
│ _pause_until = now + pause_seconds │
│ _overload_count++ │
└─────────────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ PAUSED STATE │
│ │
│ Kafka Consumer is paused │
│ Messages remain in Kafka queue │
│ System gives DB time to recover │
└─────────────────────────────────────┘
│
▼ Time expires
┌─────────────────────────────────────┐
│ AUTO RECOVERY │
│ │
│ current_time >= _pause_until ? │
│ YES: _paused = False │
│ Resume normal processing │
└─────────────────────────────────────┘
緊急煞車處理:當接收到過載異常時(對應圖中 "EMERGENCY BRAKE"):
def _handle_overload(self, error: StreamingOverloadException, source):
"""處理過載 - 緊急煞車程序"""
# 統計過載次數
self._overload_count += 1
# 設定系統狀態(對應圖中狀態變更)
self._paused = True
self._pause_until = time.time() + error.pause_seconds # 計算暫停到什麼時候
logger.warning(
f"系統過載 (第 {self._overload_count} 次)! "
f"暫停 {error.pause_seconds:.1f} 秒讓系統喘口氣..."
)
# 立即暫停 Kafka Consumer(對應圖中 "Kafka Consumer is paused")
self._pause_kafka_consumer(source)
狀態檢查與自動恢復:每次處理訊息前都會檢查(對應圖中 "AUTO RECOVERY"):
def _should_pause(self) -> bool:
"""檢查是否該暫停 - 就像看紅綠燈"""
current_time = time.time()
# 檢查當前狀態
if self._paused and current_time < self._pause_until:
return True # 還在暫停期間(對應圖中 "PAUSED STATE")
elif self._paused and current_time >= self._pause_until:
# 時間到了,自動恢復(對應圖中 "AUTO RECOVERY")
self._paused = False
logger.info("暫停結束,恢復處理訊息")
# 重要:恢復所有 Kafka Consumer
self._resume_all_kafka_consumers()
return False # 正常處理狀態
經過幾天的學習,我們的 SimpleStreamingEngine 已經從基礎的資料流處理進化為更穩定的流處理系統:
Day 7: SimpleStreamingEngine with Backpressure (帶背壓機制的流處理架構)
┌─────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ KafkaSource │───►│DataFrame.filter()│───►│PostgreSQLSink │
│ │ │ │ │ │
│ • Topic consume │ │ • Lambda filter │ │ • Batch buffer │
│ • Consumer pause│ │ • Data transform │ │ • Timer trigger │
│ • Auto resume │ │ │ │ • Bulk insert │
│ │ │ │ │ • Overload detect│
│ │ │ │ │ • Exception throw│
└─────────────────┘ └──────────────────┘ └──────────────────┘
▲ │
│ ┌─────────────────────────────────────┘
│ │
│ ▼
┌─────────────────────────────────────────────────────────────┐
│ SimpleStreamingEngine │
│ │
│ • Message handler with backpressure check │
│ • _should_pause() status management │
│ • _handle_overload() emergency brake │
│ • Kafka Consumer pause/resume control │
│ • Auto recovery after timeout │
└─────────────────────────────────────────────────────────────┘
從單純的「訊息進→處理→出」,進化為具備智能流量控制的完整系統:
背壓機制就像是給你的流處理系統請了一個聰明的調度員。他會讓整個系統跑得更穩、更持久。
記住,在高吞吐量系統的世界裡,優雅地處理過載比暴力地提升效能更重要。當你的 PostgreSQL 開始「喊累」時,讓系統聽話地慢下來,往往比強行「鞭策」它更明智。
在真實的生產環境中,背壓機制遠比我們今天實作的版本更加複雜和精細:
Apache Flink 成熟的流處理框架都內建了背壓機制:
今天我們透過簡化的實作理解了背壓機制的核心概念:
就像我們一開始說的物流平台一樣 - 最好的服務不是讓外送員累死,而是讓每個包裹都能穩定送達,讓客戶明天還願意繼續下單。
今天我們學會了讓系統在過載時「踩煞車」,但還有一個關鍵問題沒解決:如何精準追蹤處理進度?
在流處理系統中,當系統重啟或發生故障時,我們需要知道:已經處理到 Kafka 的哪個 offset?哪些數據已經成功寫入資料庫?如何避免重複處理或數據遺失?
明天我們來聊聊 手動控制 commit - 也就是處理進度管理的核心技術。我們會透過 checkpoint 機制實現: